{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "1. Create a Spark session\n",
    "2. Add the iceberg-runtime Jar"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Waiting for a Spark session to start..."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "\n",
       "<ul>\n",
       "<li><a href=\"http://hadoop-historyserver:20888/proxy/application_1506444763486_1280232\" target=\"new_tab\">Spark UI</a></li>\n",
       "<li><a href=\"http://hadoop-resourcemanager:8088/cluster/app/application_1506444763486_1280232\" target=\"new_tab\">Hadoop app: application_1506444763486_1280232</a></li>\n",
       "<li>Local logs are available using %tail_log</li>\n",
       "<li>Local logs are at: /data/tmp/jobs/20180110_171610.028692.log</li>\n",
       "</ul>\n"
      ],
      "text/plain": [
       "\n",
       "Spark application_1506444763486_1280232:\n",
       "* http://hadoop-historyserver:20888/proxy/application_1506444763486_1280232\n",
       "* http://hadoop-resourcemanager:8088/cluster/app/application_1506444763486_1280232\n",
       "\n",
       "Local logs:\n",
       "* /data/tmp/genie/jobs/20180110_171610.028692.log\n",
       "* Also available using %tail_log\n"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting download from file:///home/user/iceberg-runtime-0.1.3.jar\n",
      "Finished download of iceberg-runtime-0.1.3.jar\n"
     ]
    }
   ],
   "source": [
    "%AddJar file:///home/user/iceberg-runtime-0.1.3.jar"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Drop and create a table in HDFS\n",
    "\n",
    "[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "path = hdfs:/tmp/tables/job_metrics_tmp\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "table {\n",
       "  1: event_utc_ms: optional long\n",
       "  2: hostname: optional string\n",
       "  3: jobflow: optional string\n",
       "  4: job_name: optional string\n",
       "  5: application_type: optional string\n",
       "  6: record_id: optional string\n",
       "  7: record_type: optional string\n",
       "  8: user: optional string\n",
       "  9: submit_time: optional long\n",
       "  10: start_time: optional long\n",
       "  11: finish_time: optional long\n",
       "  12: run_host: optional string\n",
       "  13: submit_host: optional string\n",
       "  14: status: optional string\n",
       "  15: cluster_id: optional string\n",
       "  16: cluster_name: optional string\n",
       "  17: queue: optional string\n",
       "  18: genie_job_name: optional string\n",
       "  19: genie_job_id: optional string\n",
       "  20: job_uuid: optional string\n",
       "  21: counters: optional string\n",
       "  22: properties: optional string\n",
       "  23: dateint: optional int\n",
       "  24: hour: optional int\n",
       "  25: batchid: optional string\n",
       "}"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.hadoop.fs.Path\n",
    "import org.apache.iceberg.hadoop.HadoopTables\n",
    "import org.apache.iceberg.spark.SparkSchemaUtil\n",
    "\n",
    "val path = \"hdfs:/tmp/tables/job_metrics_tmp\"\n",
    "\n",
    "{ // use a block to avoid values (conf, etc.) getting caught in closures\n",
    "\n",
    "    // remove the temp table if it already exists\n",
    "    val conf = spark.sessionState.newHadoopConf()\n",
    "    val fs = new Path(path).getFileSystem(conf)\n",
    "    fs.delete(new Path(path), true /* recursive */ )\n",
    "\n",
    "    // create the temp table using Spark utils to create a schema and partition spec\n",
    "    val tables = new HadoopTables(conf)\n",
    "    val schema = SparkSchemaUtil.schemaForTable(spark, \"default.job_metrics\")\n",
    "    val spec = SparkSchemaUtil.specForTable(spark, \"default.job_metrics\")\n",
    "\n",
    "    tables.create(schema, spec, path)\n",
    "\n",
    "    // show the schema\n",
    "    tables.load(path).schema\n",
    "}\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load table partitions as a DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>partition</th><th>uri</th><th>format</th></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 0, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 1, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 2, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 3, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 4, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 5, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 6, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 7, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 8, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "<tr><td>{dateint -> 20170316, hour -> 9, batchid -> merged_1}</td><td>s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1</td><td>org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe</td></tr>\n",
       "</table>"
      ],
      "text/plain": [
       "+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+\n",
       "| partition                                             | uri                                                                                                                  | format                                                      |\n",
       "+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+\n",
       "| {dateint -> 20170316, hour -> 0, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=0/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 1, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=1/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 2, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=2/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 3, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=3/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 4, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=4/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 5, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=5/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 6, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=6/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 7, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=7/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 8, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=8/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "| {dateint -> 20170316, hour -> 9, batchid -> merged_1} | s3n://bucket/hive/warehouse/job_metrics/dateint=20170316/hour=9/batchid=merged_1 | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe |\n",
       "+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "partitions = [partition: map<string,string>, uri: string ... 1 more field]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[partition: map<string,string>, uri: string ... 1 more field]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.iceberg.spark.SparkTableUtil\n",
    "\n",
    "// get a data frame with the table's partitions\n",
    "val partitions = SparkTableUtil.partitionDF(spark, \"default.job_metrics\")\n",
    "                               .filter($\"format\".contains(\"parquet\") || $\"format\".contains(\"avro\"))\n",
    "\n",
    "display(partitions.limit(10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# List files, load metrics, and append to the table\n",
    "\n",
    "* [Table API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Table.html)\n",
    "* [Append API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/AppendFiles.html)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Stage 3:====================================================>     (9 + 1) / 10]"
     ]
    },
    {
     "data": {
      "text/plain": [
       "0"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.iceberg.hadoop.HadoopTables\n",
    "import org.apache.hadoop.conf.Configuration\n",
    "\n",
    "partitions.repartition(100).flatMap { row =>\n",
    "\n",
    "    // list the partition and read Parquet footers to get metrics\n",
    "    SparkTableUtil.listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2))\n",
    "\n",
    "}.repartition(10) // avoid lots of manifests that would be merged later\n",
    " .mapPartitions { files =>\n",
    "\n",
    "    // open the table and append the files from this partition\n",
    "    val tables = new HadoopTables(new Configuration())\n",
    "    val table = tables.load(\"hdfs:/tmp/tables/job_metrics_tmp\")\n",
    "\n",
    "    // fast appends will create a manifest for the new files\n",
    "    val append = table.newFastAppend\n",
    "\n",
    "    files.foreach { file =>\n",
    "        append.appendFile(file.toDataFile(table.spec))\n",
    "    }\n",
    "\n",
    "    // commit the new files\n",
    "    append.commit()\n",
    "\n",
    "    Seq.empty[String].iterator\n",
    "\n",
    "}.count\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Inspect the results\n",
    "\n",
    "[Snapshot API](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/Snapshot.html)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "tables = org.apache.iceberg.hadoop.HadoopTables@1782cb95\n",
       "table = hdfs:/tmp/tables/job_metrics_tmp\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "BaseSnapshot{id=1515605124481, timestamp_ms=1515605127199, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/695d8ab7-961c-4cef-94d7-367db5d8f7de-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/725154b3-92bd-4d00-9420-34a2866f2876-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/266e6040-d8ff-4713-92cb-0d806c7a3baf-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/3b0e9c88-03b0-4032-bf70-f9af43e00034-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/0747127e-895e-492e-b07e-a54627ee5534-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/db055992-1bf1-4fe7-a851-1eff0a05af55-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/1d5b7cb9-85bd-4088-ad26-a4e9562ad181-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/36db4143-8720-4060-9a8d-d17fa7dcf46f-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/46a079c3-8654-4ed5-9466-088320bda559-m0.avro, hdfs:/tmp/tables/job_metrics_tmp/metadata/f239498c-7386-4f31-8421-518105ffbf6a-m0.avro]}"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val tables = new HadoopTables(spark.sessionState.newHadoopConf())\n",
    "val table = tables.load(path)\n",
    "\n",
    "table.currentSnapshot"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "7087"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import scala.collection.JavaConverters._\n",
    "\n",
    "table.currentSnapshot.addedFiles.asScala.size"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "table.newAppend.commit // use a merge commit to create a single manifest"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "BaseSnapshot{id=1515605215920, timestamp_ms=1515605220253, manifests=[hdfs:/tmp/tables/job_metrics_tmp/metadata/213364b0-d97f-49bf-9126-7273b9784cfb-m0.avro]}"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.currentSnapshot"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "hide_input": false,
  "kernelspec": {
   "display_name": "Spark 2.0.0 - Scala 2.11",
   "language": "scala",
   "name": "spark2-scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
